1   package org.apache.solr.cloud;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements.  See the NOTICE file distributed with
6    * this work for additional information regarding copyright ownership.
7    * The ASF licenses this file to You under the Apache License, Version 2.0
8    * (the "License"); you may not use this file except in compliance with
9    * the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  import org.apache.solr.SolrTestCaseJ4;
21  import org.apache.solr.client.solrj.SolrResponse;
22  import org.apache.solr.client.solrj.response.QueryResponse;
23  import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
24  import org.apache.solr.cloud.Overseer.LeaderStatus;
25  import org.apache.solr.common.cloud.ClusterState;
26  import org.apache.solr.common.cloud.SolrZkClient;
27  import org.apache.solr.common.cloud.ZkNodeProps;
28  import org.apache.solr.common.cloud.ZkStateReader;
29  import org.apache.solr.common.params.CollectionParams;
30  import org.apache.solr.common.params.CoreAdminParams;
31  import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
32  import org.apache.solr.common.params.ModifiableSolrParams;
33  import org.apache.solr.common.util.StrUtils;
34  import org.apache.solr.common.util.Utils;
35  import org.apache.solr.handler.component.ShardHandler;
36  import org.apache.solr.handler.component.ShardHandlerFactory;
37  import org.apache.solr.handler.component.ShardRequest;
38  import org.apache.solr.handler.component.ShardResponse;
39  import org.apache.solr.util.TimeOut;
40  import org.apache.zookeeper.CreateMode;
41  import org.easymock.Capture;
42  import org.easymock.EasyMock;
43  import org.easymock.IAnswer;
44  import org.junit.After;
45  import org.junit.AfterClass;
46  import org.junit.Before;
47  import org.junit.BeforeClass;
48  import org.junit.Test;
49  import org.slf4j.Logger;
50  import org.slf4j.LoggerFactory;
51  
52  import java.lang.invoke.MethodHandles;
53  import java.util.ArrayList;
54  import java.util.Arrays;
55  import java.util.Collection;
56  import java.util.Collections;
57  import java.util.HashMap;
58  import java.util.HashSet;
59  import java.util.Iterator;
60  import java.util.List;
61  import java.util.Map;
62  import java.util.Map.Entry;
63  import java.util.Queue;
64  import java.util.Set;
65  import java.util.concurrent.ArrayBlockingQueue;
66  import java.util.concurrent.TimeUnit;
67  
68  import static org.easymock.EasyMock.anyBoolean;
69  import static org.easymock.EasyMock.anyObject;
70  import static org.easymock.EasyMock.capture;
71  import static org.easymock.EasyMock.createMock;
72  import static org.easymock.EasyMock.expect;
73  import static org.easymock.EasyMock.expectLastCall;
74  import static org.easymock.EasyMock.getCurrentArguments;
75  import static org.easymock.EasyMock.replay;
76  import static org.easymock.EasyMock.reset;
77  import static org.easymock.EasyMock.verify;
78  
79  public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
80  
81    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
82    
83    private static final String ADMIN_PATH = "/admin/cores";
84    private static final String COLLECTION_NAME = "mycollection";
85    private static final String CONFIG_NAME = "myconfig";
86    
87    private static OverseerTaskQueue workQueueMock;
88    private static DistributedMap runningMapMock;
89    private static DistributedMap completedMapMock;
90    private static DistributedMap failureMapMock;
91    private static ShardHandlerFactory shardHandlerFactoryMock;
92    private static ShardHandler shardHandlerMock;
93    private static ZkStateReader zkStateReaderMock;
94    private static ClusterState clusterStateMock;
95    private static SolrZkClient solrZkClientMock;
96    private final Map zkMap = new HashMap();
97    private final Set collectionsSet = new HashSet();
98    private SolrResponse lastProcessMessageResult;
99  
100 
101   private OverseerCollectionConfigSetProcessorToBeTested underTest;
102   
103   private Thread thread;
104   private Queue<QueueEvent> queue = new ArrayBlockingQueue<>(10);
105 
106   private class OverseerCollectionConfigSetProcessorToBeTested extends
107       OverseerCollectionConfigSetProcessor {
108     
109 
110     public OverseerCollectionConfigSetProcessorToBeTested(ZkStateReader zkStateReader,
111         String myId, ShardHandlerFactory shardHandlerFactory,
112         String adminPath,
113         OverseerTaskQueue workQueue, DistributedMap runningMap,
114         DistributedMap completedMap,
115         DistributedMap failureMap) {
116       super(zkStateReader, myId, shardHandlerFactory, adminPath, new Overseer.Stats(), null, new OverseerNodePrioritizer(zkStateReader, adminPath, shardHandlerFactory), workQueue, runningMap, completedMap, failureMap);
117     }
118     
119     @Override
120     protected LeaderStatus amILeader() {
121       return LeaderStatus.YES;
122     }
123     
124   }
125   
126   @BeforeClass
127   public static void setUpOnce() throws Exception {
128     workQueueMock = createMock(OverseerTaskQueue.class);
129     runningMapMock = createMock(DistributedMap.class);
130     completedMapMock = createMock(DistributedMap.class);
131     failureMapMock = createMock(DistributedMap.class);
132     shardHandlerFactoryMock = createMock(ShardHandlerFactory.class);
133     shardHandlerMock = createMock(ShardHandler.class);
134     zkStateReaderMock = createMock(ZkStateReader.class);
135     clusterStateMock = createMock(ClusterState.class);
136     solrZkClientMock = createMock(SolrZkClient.class);
137 
138   }
139   
140   @AfterClass
141   public static void tearDownOnce() {
142     workQueueMock = null;
143     runningMapMock = null;
144     completedMapMock = null;
145     failureMapMock = null;
146     shardHandlerFactoryMock = null;
147     shardHandlerMock = null;
148     zkStateReaderMock = null;
149     clusterStateMock = null;
150     solrZkClientMock = null;
151   }
152   
153   @Before
154   public void setUp() throws Exception {
155     super.setUp();
156     queue.clear();
157     reset(workQueueMock);
158     reset(runningMapMock);
159     reset(completedMapMock);
160     reset(failureMapMock);
161     reset(shardHandlerFactoryMock);
162     reset(shardHandlerMock);
163     reset(zkStateReaderMock);
164     reset(clusterStateMock);
165     reset(solrZkClientMock);
166     underTest = new OverseerCollectionConfigSetProcessorToBeTested(zkStateReaderMock,
167         "1234", shardHandlerFactoryMock, ADMIN_PATH, workQueueMock, runningMapMock,
168         completedMapMock, failureMapMock);
169     zkMap.clear();
170     collectionsSet.clear();
171   }
172   
173   @After
174   public void tearDown() throws Exception {
175     stopComponentUnderTest();
176     super.tearDown();
177   }
178   
179   protected Set<String> commonMocks(int liveNodesCount) throws Exception {
180 
181     shardHandlerFactoryMock.getShardHandler();
182     expectLastCall().andAnswer(new IAnswer<ShardHandler>() {
183       @Override
184       public ShardHandler answer() throws Throwable {
185         log.info("SHARDHANDLER");
186         return shardHandlerMock;
187       }
188     }).anyTimes();
189     workQueueMock.peekTopN(EasyMock.anyInt(), anyObject(Set.class), EasyMock.anyLong());
190     expectLastCall().andAnswer(new IAnswer<List>() {
191       @Override
192       public List answer() throws Throwable {
193         Object result;
194         int count = 0;
195         while ((result = queue.peek()) == null) {
196           Thread.sleep(1000);
197           count++;
198           if (count > 1) return null;
199         }
200 
201         return Arrays.asList(result);
202       }
203     }).anyTimes();
204 
205     workQueueMock.getTailId();
206     expectLastCall().andAnswer(new IAnswer<Object>() {
207       @Override
208       public Object answer() throws Throwable {
209         Object result = null;
210         Iterator iter = queue.iterator();
211         while(iter.hasNext()) {
212           result = iter.next();
213         }
214         return result==null ? null : ((QueueEvent)result).getId();
215       }
216     }).anyTimes();
217 
218     workQueueMock.peek(true);
219     expectLastCall().andAnswer(new IAnswer<Object>() {
220       @Override
221       public Object answer() throws Throwable {
222         Object result;
223         while ((result = queue.peek()) == null) {
224           Thread.sleep(1000);
225         }
226         return result;
227       }
228     }).anyTimes();
229     
230     workQueueMock.remove(anyObject(QueueEvent.class));
231     expectLastCall().andAnswer(new IAnswer<Object>() {
232       @Override
233       public Object answer() throws Throwable {
234         queue.remove((QueueEvent) getCurrentArguments()[0]);
235         return null;
236       }
237     }).anyTimes();
238     
239     workQueueMock.poll();
240     expectLastCall().andAnswer(new IAnswer<Object>() {
241       @Override
242       public Object answer() throws Throwable {
243         return queue.poll();
244       }
245     }).anyTimes();
246 
247     zkStateReaderMock.getClusterState();
248     expectLastCall().andAnswer(new IAnswer<Object>() {
249       @Override
250       public Object answer() throws Throwable {
251         return clusterStateMock;
252       }
253     }).anyTimes();
254     
255     zkStateReaderMock.getZkClient();
256     expectLastCall().andAnswer(new IAnswer<Object>() {
257       @Override
258       public Object answer() throws Throwable {
259         return solrZkClientMock;
260       }
261     }).anyTimes();
262 
263     zkStateReaderMock.updateClusterState();
264 
265     clusterStateMock.getCollections();
266     expectLastCall().andAnswer(new IAnswer<Object>() {
267       @Override
268       public Object answer() throws Throwable {
269         return collectionsSet;
270       }
271     }).anyTimes();
272     final Set<String> liveNodes = new HashSet<>();
273     for (int i = 0; i < liveNodesCount; i++) {
274       final String address = "localhost:" + (8963 + i) + "_solr";
275       liveNodes.add(address);
276       
277       zkStateReaderMock.getBaseUrlForNodeName(address);
278       expectLastCall().andAnswer(new IAnswer<Object>() {
279         @Override
280         public Object answer() throws Throwable {
281           // This works as long as this test does not use a 
282           // webapp context with an underscore in it
283           return address.replaceAll("_", "/");
284         }
285       }).anyTimes();
286       
287     }
288     zkStateReaderMock.getClusterProps();
289     expectLastCall().andAnswer(new IAnswer<Map>() {
290       @Override
291       public Map answer() throws Throwable {
292         return new HashMap();
293       }
294     });
295 
296     solrZkClientMock.getZkClientTimeout();
297     expectLastCall().andAnswer(new IAnswer<Object>() {
298       @Override
299       public Object answer() throws Throwable {
300         return 30000;
301       }
302     }).anyTimes();
303     
304     clusterStateMock.hasCollection(anyObject(String.class));
305     expectLastCall().andAnswer(new IAnswer<Boolean>() {
306       @Override
307       public Boolean answer() throws Throwable {
308         String key = (String) getCurrentArguments()[0];
309         return collectionsSet.contains(key);
310       }
311     } ).anyTimes();
312 
313 
314     clusterStateMock.getLiveNodes();
315     expectLastCall().andAnswer(new IAnswer<Object>() {
316       @Override
317       public Object answer() throws Throwable {
318         return liveNodes;
319       }
320     }).anyTimes();
321     solrZkClientMock.create(anyObject(String.class), anyObject(byte[].class), anyObject(CreateMode.class), anyBoolean());
322     expectLastCall().andAnswer(new IAnswer<String>() {
323       @Override
324       public String answer() throws Throwable {
325         String key = (String) getCurrentArguments()[0];
326         zkMap.put(key, null);
327         handleCreateCollMessage((byte[]) getCurrentArguments()[1]);
328         return key;
329       }
330     }).anyTimes();
331 
332     solrZkClientMock.makePath(anyObject(String.class), anyObject(byte[].class), anyBoolean());
333     expectLastCall().andAnswer(new IAnswer<String>() {
334       @Override
335       public String answer() throws Throwable {
336         String key = (String) getCurrentArguments()[0];
337         return key;
338       }
339     }).anyTimes();
340 
341     solrZkClientMock.makePath(anyObject(String.class), anyObject(byte[].class), anyObject(CreateMode.class), anyBoolean());
342     expectLastCall().andAnswer(new IAnswer<String>() {
343       @Override
344       public String answer() throws Throwable {
345         String key = (String) getCurrentArguments()[0];
346         return key;
347       }
348     }).anyTimes();
349 
350     solrZkClientMock.exists(anyObject(String.class),anyBoolean());
351     expectLastCall().andAnswer(new IAnswer<Boolean>() {
352       @Override
353       public Boolean answer() throws Throwable {
354         String key = (String) getCurrentArguments()[0];
355         return zkMap.containsKey(key);
356       }
357     }).anyTimes();
358     
359     zkMap.put("/configs/myconfig", null);
360     
361     return liveNodes;
362   }
363 
364   private void handleCreateCollMessage(byte[] bytes) {
365     try {
366       ZkNodeProps props = ZkNodeProps.load(bytes);
367       if(CollectionParams.CollectionAction.CREATE.isEqual(props.getStr("operation"))){
368         String collName = props.getStr("name") ;
369         if(collName != null) collectionsSet.add(collName);
370       }
371     } catch (Exception e) { }
372   }
373 
374   protected void startComponentUnderTest() {
375     thread = new Thread(underTest);
376     thread.start();
377   }
378   
379   protected void stopComponentUnderTest() throws Exception {
380     underTest.close();
381     thread.interrupt();
382     thread.join();
383   }
384   
385   private class SubmitCapture {
386     public Capture<ShardRequest> shardRequestCapture = new Capture<>();
387     public Capture<String> nodeUrlsWithoutProtocolPartCapture = new Capture<>();
388     public Capture<ModifiableSolrParams> params = new Capture<>();
389   }
390   
391   protected List<SubmitCapture> mockShardHandlerForCreateJob(
392       Integer numberOfSlices, Integer numberOfReplica) {
393     List<SubmitCapture> submitCaptures = new ArrayList<>();
394     for (int i = 0; i < (numberOfSlices * numberOfReplica); i++) {
395       SubmitCapture submitCapture = new SubmitCapture();
396       shardHandlerMock.submit(capture(submitCapture.shardRequestCapture),
397           capture(submitCapture.nodeUrlsWithoutProtocolPartCapture),
398           capture(submitCapture.params));
399       expectLastCall();
400       submitCaptures.add(submitCapture);
401       ShardResponse shardResponseWithoutException = new ShardResponse();
402       shardResponseWithoutException.setSolrResponse(new QueryResponse());
403       expect(shardHandlerMock.takeCompletedOrError()).andReturn(
404           shardResponseWithoutException);
405     }
406     expect(shardHandlerMock.takeCompletedOrError()).andReturn(null);
407     return submitCaptures;
408   }
409   
410   protected void issueCreateJob(Integer numberOfSlices,
411       Integer replicationFactor, Integer maxShardsPerNode, List<String> createNodeList, boolean sendCreateNodeList, boolean createNodeSetShuffle) {
412     Map<String,Object> propMap = Utils.makeMap(
413         Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
414         ZkStateReader.REPLICATION_FACTOR, replicationFactor.toString(),
415         "name", COLLECTION_NAME,
416         "collection.configName", CONFIG_NAME,
417         OverseerCollectionMessageHandler.NUM_SLICES, numberOfSlices.toString(),
418         ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode.toString()
419     );
420     if (sendCreateNodeList) {
421       propMap.put(OverseerCollectionMessageHandler.CREATE_NODE_SET,
422           (createNodeList != null)?StrUtils.join(createNodeList, ','):null);
423       if (OverseerCollectionMessageHandler.CREATE_NODE_SET_SHUFFLE_DEFAULT != createNodeSetShuffle || random().nextBoolean()) {
424         propMap.put(OverseerCollectionMessageHandler.CREATE_NODE_SET_SHUFFLE, createNodeSetShuffle);
425       }
426     }
427 
428     ZkNodeProps props = new ZkNodeProps(propMap);
429     QueueEvent qe = new QueueEvent("id", Utils.toJSON(props), null){
430       @Override
431       public void setBytes(byte[] bytes) {
432         lastProcessMessageResult = SolrResponse.deserialize( bytes);
433       }
434     };
435     queue.add(qe);
436   }
437   
438   protected void verifySubmitCaptures(List<SubmitCapture> submitCaptures,
439       Integer numberOfSlices, Integer numberOfReplica, Collection<String> createNodes, boolean dontShuffleCreateNodeSet) {
440     List<String> coreNames = new ArrayList<>();
441     Map<String,Map<String,Integer>> sliceToNodeUrlsWithoutProtocolPartToNumberOfShardsRunningMapMap = new HashMap<>();
442     List<String> nodeUrlWithoutProtocolPartForLiveNodes = new ArrayList<>(
443         createNodes.size());
444     for (String nodeName : createNodes) {
445       String nodeUrlWithoutProtocolPart = nodeName.replaceAll("_", "/");
446       if (nodeUrlWithoutProtocolPart.startsWith("http://")) nodeUrlWithoutProtocolPart = nodeUrlWithoutProtocolPart
447           .substring(7);
448       nodeUrlWithoutProtocolPartForLiveNodes.add(nodeUrlWithoutProtocolPart);
449     }
450     final Map<String,String> coreName_TO_nodeUrlWithoutProtocolPartForLiveNodes_map = new HashMap<>();
451     
452     for (SubmitCapture submitCapture : submitCaptures) {
453       ShardRequest shardRequest = submitCapture.shardRequestCapture.getValue();
454       assertEquals(CoreAdminAction.CREATE.toString(),
455           shardRequest.params.get(CoreAdminParams.ACTION));
456       // assertEquals(shardRequest.params, submitCapture.params);
457       String coreName = shardRequest.params.get(CoreAdminParams.NAME);
458       assertFalse("Core with name " + coreName + " created twice",
459           coreNames.contains(coreName));
460       coreNames.add(coreName);
461       assertEquals(CONFIG_NAME,
462           shardRequest.params.get("collection.configName"));
463       assertEquals(COLLECTION_NAME,
464           shardRequest.params.get(CoreAdminParams.COLLECTION));
465       assertEquals(numberOfSlices.toString(),
466           shardRequest.params.get(ZkStateReader.NUM_SHARDS_PROP));
467       assertEquals(ADMIN_PATH, shardRequest.params.get("qt"));
468       assertEquals(1, shardRequest.purpose);
469       assertEquals(1, shardRequest.shards.length);
470       assertEquals(submitCapture.nodeUrlsWithoutProtocolPartCapture.getValue(),
471           shardRequest.shards[0]);
472       assertTrue("Shard " + coreName + " created on wrong node "
473           + shardRequest.shards[0],
474           nodeUrlWithoutProtocolPartForLiveNodes
475               .contains(shardRequest.shards[0]));
476       coreName_TO_nodeUrlWithoutProtocolPartForLiveNodes_map.put(coreName, shardRequest.shards[0]);
477       assertEquals(shardRequest.shards, shardRequest.actualShards);
478       
479       String sliceName = shardRequest.params.get(CoreAdminParams.SHARD);
480       if (!sliceToNodeUrlsWithoutProtocolPartToNumberOfShardsRunningMapMap
481           .containsKey(sliceName)) {
482         sliceToNodeUrlsWithoutProtocolPartToNumberOfShardsRunningMapMap.put(
483             sliceName, new HashMap<String,Integer>());
484       }
485       Map<String,Integer> nodeUrlsWithoutProtocolPartToNumberOfShardsRunningMap = sliceToNodeUrlsWithoutProtocolPartToNumberOfShardsRunningMapMap
486           .get(sliceName);
487       Integer existingCount;
488       nodeUrlsWithoutProtocolPartToNumberOfShardsRunningMap
489           .put(
490               shardRequest.shards[0],
491               ((existingCount = nodeUrlsWithoutProtocolPartToNumberOfShardsRunningMap
492                   .get(shardRequest.shards[0])) == null) ? 1
493                   : (existingCount + 1));
494     }
495     
496     assertEquals(numberOfSlices * numberOfReplica, coreNames.size());
497     for (int i = 1; i <= numberOfSlices; i++) {
498       for (int j = 1; j <= numberOfReplica; j++) {
499         String coreName = COLLECTION_NAME + "_shard" + i + "_replica" + j;
500         assertTrue("Shard " + coreName + " was not created",
501             coreNames.contains(coreName));
502         
503         if (dontShuffleCreateNodeSet) {
504           final String expectedNodeName = nodeUrlWithoutProtocolPartForLiveNodes.get((numberOfReplica * (i - 1) + (j - 1)) % nodeUrlWithoutProtocolPartForLiveNodes.size());
505           assertFalse("expectedNodeName is null for coreName="+coreName, null == expectedNodeName);
506           
507           final String actualNodeName = coreName_TO_nodeUrlWithoutProtocolPartForLiveNodes_map.get(coreName);
508           assertFalse("actualNodeName is null for coreName="+coreName, null == actualNodeName);
509 
510           assertTrue("node name mismatch for coreName="+coreName+" ( actual="+actualNodeName+" versus expected="+expectedNodeName+" )", actualNodeName.equals(expectedNodeName));
511         }
512       }
513     }
514     
515     assertEquals(numberOfSlices.intValue(),
516         sliceToNodeUrlsWithoutProtocolPartToNumberOfShardsRunningMapMap.size());
517     for (int i = 1; i <= numberOfSlices; i++) {
518       sliceToNodeUrlsWithoutProtocolPartToNumberOfShardsRunningMapMap.keySet()
519           .contains("shard" + i);
520     }
521     int minShardsPerSlicePerNode = numberOfReplica / createNodes.size();
522     int numberOfNodesSupposedToRunMaxShards = numberOfReplica
523         % createNodes.size();
524     int numberOfNodesSupposedToRunMinShards = createNodes.size()
525         - numberOfNodesSupposedToRunMaxShards;
526     int maxShardsPerSlicePerNode = (minShardsPerSlicePerNode + 1);
527     if (numberOfNodesSupposedToRunMaxShards == 0) {
528       numberOfNodesSupposedToRunMaxShards = numberOfNodesSupposedToRunMinShards;
529       maxShardsPerSlicePerNode = minShardsPerSlicePerNode;
530     }
531     boolean diffBetweenMinAndMaxShardsPerSlicePerNode = (maxShardsPerSlicePerNode != minShardsPerSlicePerNode);
532     
533     for (Entry<String,Map<String,Integer>> sliceToNodeUrlsWithoutProtocolPartToNumberOfShardsRunningMapMapEntry : sliceToNodeUrlsWithoutProtocolPartToNumberOfShardsRunningMapMap
534         .entrySet()) {
535       int numberOfShardsRunning = 0;
536       int numberOfNodesRunningMinShards = 0;
537       int numberOfNodesRunningMaxShards = 0;
538       int numberOfNodesRunningAtLeastOneShard = 0;
539       for (String nodeUrlsWithoutProtocolPart : sliceToNodeUrlsWithoutProtocolPartToNumberOfShardsRunningMapMapEntry
540           .getValue().keySet()) {
541         int numberOfShardsRunningOnThisNode = sliceToNodeUrlsWithoutProtocolPartToNumberOfShardsRunningMapMapEntry
542             .getValue().get(nodeUrlsWithoutProtocolPart);
543         numberOfShardsRunning += numberOfShardsRunningOnThisNode;
544         numberOfNodesRunningAtLeastOneShard++;
545         assertTrue(
546             "Node "
547                 + nodeUrlsWithoutProtocolPart
548                 + " is running wrong number of shards. Supposed to run "
549                 + minShardsPerSlicePerNode
550                 + (diffBetweenMinAndMaxShardsPerSlicePerNode ? (" or " + maxShardsPerSlicePerNode)
551                     : ""),
552             (numberOfShardsRunningOnThisNode == minShardsPerSlicePerNode)
553                 || (numberOfShardsRunningOnThisNode == maxShardsPerSlicePerNode));
554         if (numberOfShardsRunningOnThisNode == minShardsPerSlicePerNode) numberOfNodesRunningMinShards++;
555         if (numberOfShardsRunningOnThisNode == maxShardsPerSlicePerNode) numberOfNodesRunningMaxShards++;
556       }
557       if (minShardsPerSlicePerNode == 0) numberOfNodesRunningMinShards = (createNodes
558           .size() - numberOfNodesRunningAtLeastOneShard);
559       assertEquals(
560           "Too many shards are running under slice "
561               + sliceToNodeUrlsWithoutProtocolPartToNumberOfShardsRunningMapMapEntry
562                   .getKey(),
563           numberOfReplica.intValue(), numberOfShardsRunning);
564       assertEquals(numberOfNodesSupposedToRunMinShards,
565           numberOfNodesRunningMinShards);
566       assertEquals(numberOfNodesSupposedToRunMaxShards,
567           numberOfNodesRunningMaxShards);
568     }
569   }
570   
571   protected void waitForEmptyQueue(long maxWait) throws Exception {
572     final TimeOut timeout = new TimeOut(maxWait, TimeUnit.MILLISECONDS);
573     while (queue.peek() != null) {
574       if (timeout.hasTimedOut())
575         fail("Queue not empty within " + maxWait + " ms");
576       Thread.sleep(100);
577     }
578   }
579   
580   protected enum CreateNodeListOptions {
581     SEND,
582     DONT_SEND,
583     SEND_NULL
584   }
585   protected void testTemplate(Integer numberOfNodes, Integer numberOfNodesToCreateOn, CreateNodeListOptions createNodeListOption, Integer replicationFactor,
586       Integer numberOfSlices, Integer maxShardsPerNode,
587       boolean collectionExceptedToBeCreated) throws Exception {
588     assertTrue("Wrong usage of testTemplate. numberOfNodesToCreateOn " + numberOfNodesToCreateOn + " is not allowed to be higher than numberOfNodes " + numberOfNodes, numberOfNodes.intValue() >= numberOfNodesToCreateOn.intValue());
589     assertTrue("Wrong usage of testTemplage. createNodeListOption has to be " + CreateNodeListOptions.SEND + " when numberOfNodes and numberOfNodesToCreateOn are unequal", ((createNodeListOption == CreateNodeListOptions.SEND) || (numberOfNodes.intValue() == numberOfNodesToCreateOn.intValue())));
590     
591     Set<String> liveNodes = commonMocks(numberOfNodes);
592     List<String> createNodeList = new ArrayList<>();
593     int i = 0;
594     for (String node : liveNodes) {
595       if (i++ < numberOfNodesToCreateOn) {
596         createNodeList.add(node);
597       }
598     }
599     
600     if (random().nextBoolean()) Collections.shuffle(createNodeList, OverseerCollectionMessageHandler.RANDOM);
601     
602     List<SubmitCapture> submitCaptures = null;
603     if (collectionExceptedToBeCreated) {
604       submitCaptures = mockShardHandlerForCreateJob(numberOfSlices,
605           replicationFactor);
606     }
607     
608     replay(workQueueMock);
609     replay(solrZkClientMock);
610     replay(zkStateReaderMock);
611     replay(clusterStateMock);
612     replay(shardHandlerFactoryMock);
613     replay(shardHandlerMock);
614 
615 
616     log.info("clusterstate " + clusterStateMock.hashCode());
617 
618     startComponentUnderTest();
619     
620     final List<String> createNodeListToSend = ((createNodeListOption != CreateNodeListOptions.SEND_NULL) ? createNodeList : null);
621     final boolean sendCreateNodeList = (createNodeListOption != CreateNodeListOptions.DONT_SEND);
622     final boolean dontShuffleCreateNodeSet = (createNodeListToSend != null) && sendCreateNodeList && random().nextBoolean();
623     issueCreateJob(numberOfSlices, replicationFactor, maxShardsPerNode, createNodeListToSend, sendCreateNodeList, !dontShuffleCreateNodeSet);
624     waitForEmptyQueue(10000);
625     
626     if (collectionExceptedToBeCreated) {
627       assertNotNull(lastProcessMessageResult.getResponse().toString(), lastProcessMessageResult);
628     }
629     verify(shardHandlerFactoryMock);
630     verify(shardHandlerMock);
631 
632     if (collectionExceptedToBeCreated) {
633       verifySubmitCaptures(submitCaptures, numberOfSlices, replicationFactor,
634           createNodeList, dontShuffleCreateNodeSet);
635     }
636   }
637     @Test
638   public void testNoReplicationEqualNumberOfSlicesPerNode() throws Exception {
639     Integer numberOfNodes = 4;
640     Integer numberOfNodesToCreateOn = 4;
641     CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.DONT_SEND;
642     Integer replicationFactor = 1;
643     Integer numberOfSlices = 8;
644     Integer maxShardsPerNode = 2;
645     testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
646         maxShardsPerNode, true);
647   }
648   
649   @Test
650   public void testReplicationEqualNumberOfSlicesPerNode() throws Exception {
651     Integer numberOfNodes = 4;
652     Integer numberOfNodesToCreateOn = 4;
653     CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.DONT_SEND;
654     Integer replicationFactor = 2;
655     Integer numberOfSlices = 4;
656     Integer maxShardsPerNode = 2;
657     testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
658         maxShardsPerNode, true);
659   }
660   
661   @Test
662   public void testNoReplicationEqualNumberOfSlicesPerNodeSendCreateNodesEqualToLiveNodes() throws Exception {
663     Integer numberOfNodes = 4;
664     Integer numberOfNodesToCreateOn = 4;
665     CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.SEND;
666     Integer replicationFactor = 1;
667     Integer numberOfSlices = 8;
668     Integer maxShardsPerNode = 2;
669     testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
670         maxShardsPerNode, true);
671   }
672   
673   @Test
674   public void testReplicationEqualNumberOfSlicesPerNodeSendCreateNodesEqualToLiveNodes() throws Exception {
675     Integer numberOfNodes = 4;
676     Integer numberOfNodesToCreateOn = 4;
677     CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.SEND;
678     Integer replicationFactor = 2;
679     Integer numberOfSlices = 4;
680     Integer maxShardsPerNode = 2;
681     testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
682         maxShardsPerNode, true);
683   }
684   
685   @Test
686   public void testNoReplicationEqualNumberOfSlicesPerNodeSendNullCreateNodes() throws Exception {
687     Integer numberOfNodes = 4;
688     Integer numberOfNodesToCreateOn = 4;
689     CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.SEND_NULL;
690     Integer replicationFactor = 1;
691     Integer numberOfSlices = 8;
692     Integer maxShardsPerNode = 2;
693     testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
694         maxShardsPerNode, true);
695   }
696   
697   @Test
698   public void testReplicationEqualNumberOfSlicesPerNodeSendNullCreateNodes() throws Exception {
699     Integer numberOfNodes = 4;
700     Integer numberOfNodesToCreateOn = 4;
701     CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.SEND_NULL;
702     Integer replicationFactor = 2;
703     Integer numberOfSlices = 4;
704     Integer maxShardsPerNode = 2;
705     testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
706         maxShardsPerNode, true);
707   }  
708   
709   @Test
710   public void testNoReplicationUnequalNumberOfSlicesPerNode() throws Exception {
711     Integer numberOfNodes = 4;
712     Integer numberOfNodesToCreateOn = 4;
713     CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.DONT_SEND;
714     Integer replicationFactor = 1;
715     Integer numberOfSlices = 6;
716     Integer maxShardsPerNode = 2;
717     testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
718         maxShardsPerNode, true);
719   }
720   
721   @Test
722   public void testReplicationUnequalNumberOfSlicesPerNode() throws Exception {
723     Integer numberOfNodes = 4;
724     Integer numberOfNodesToCreateOn = 4;
725     CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.DONT_SEND;
726     Integer replicationFactor = 2;
727     Integer numberOfSlices = 3;
728     Integer maxShardsPerNode = 2;
729     testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
730         maxShardsPerNode, true);
731   }
732   
733   @Test
734   public void testNoReplicationCollectionNotCreatedDueToMaxShardsPerNodeLimit()
735       throws Exception {
736     Integer numberOfNodes = 4;
737     Integer numberOfNodesToCreateOn = 4;
738     CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.DONT_SEND;
739     Integer replicationFactor = 1;
740     Integer numberOfSlices = 6;
741     Integer maxShardsPerNode = 1;
742     testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
743         maxShardsPerNode, false);
744   }
745   
746   @Test
747   public void testReplicationCollectionNotCreatedDueToMaxShardsPerNodeLimit()
748       throws Exception {
749     Integer numberOfNodes = 4;
750     Integer numberOfNodesToCreateOn = 4;
751     CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.DONT_SEND;
752     Integer replicationFactor = 2;
753     Integer numberOfSlices = 3;
754     Integer maxShardsPerNode = 1;
755     testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
756         maxShardsPerNode, false);
757   }
758 
759   @Test
760   public void testNoReplicationLimitedNodesToCreateOn()
761       throws Exception {
762     Integer numberOfNodes = 4;
763     Integer numberOfNodesToCreateOn = 2;
764     CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.SEND;
765     Integer replicationFactor = 1;
766     Integer numberOfSlices = 6;
767     Integer maxShardsPerNode = 3;
768     testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
769         maxShardsPerNode, true);
770   }
771   
772   @Test
773   public void testReplicationLimitedNodesToCreateOn()
774       throws Exception {
775     Integer numberOfNodes = 4;
776     Integer numberOfNodesToCreateOn = 2;
777     CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.SEND;
778     Integer replicationFactor = 2;
779     Integer numberOfSlices = 3;
780     Integer maxShardsPerNode = 3;
781     testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
782         maxShardsPerNode, true);
783   }
784 
785   @Test
786   public void testNoReplicationCollectionNotCreatedDueToMaxShardsPerNodeAndNodesToCreateOnLimits()
787       throws Exception {
788     Integer numberOfNodes = 4;
789     Integer numberOfNodesToCreateOn = 3;
790     CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.SEND;
791     Integer replicationFactor = 1;
792     Integer numberOfSlices = 8;
793     Integer maxShardsPerNode = 2;
794     testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
795         maxShardsPerNode, false);
796   }
797   
798   @Test
799   public void testReplicationCollectionNotCreatedDueToMaxShardsPerNodeAndNodesToCreateOnLimits()
800       throws Exception {
801     Integer numberOfNodes = 4;
802     Integer numberOfNodesToCreateOn = 3;
803     CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.SEND;
804     Integer replicationFactor = 2;
805     Integer numberOfSlices = 4;
806     Integer maxShardsPerNode = 2;
807     testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
808         maxShardsPerNode, false);
809   }
810 
811 }